18.5 执行
M执行G并发任务有两个起点:线程启动函数mstart,还有就是stopm休眠唤醒后再度恢复调度循环。
让我们从头开始。
proc1.go
func mstart() { g := getg() // 确定栈边界 if g.stack.lo == 0 { // 对于无法使用 g0 stack 的系统,直接在系统堆栈上划出所需空间 size := g.stack.hi if size == 0 { size = 8192 * stackGuardMultiplier } // 通过取 size 变量指针来确定高位地址 g.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) g.stack.lo = g.stack.hi - size + 1024 } g.stackguard0 = g.stack.lo + _StackGuard g.stackguard1 = g.stackguard0 mstart1() } func mstart1() { g := getg() if g != g.m.g0 { throw(“bad runtime•mstart”) } // 初始化 g0 执行现场 gosave(&g.m.g0.sched) g.m.g0.sched.pc = ^uintptr(0) // make sure it is never used // 执行启动函数 if fn := g.m.mstartfn; fn != nil { fn() } // 在 GC startTheWorld 时,会检查闲置 M 是否少于并发标记需求(needaddgcproc) // 新建 M,设置 m.helpgc = -1,加入闲置队列等待唤醒 if g.m.helpgc != 0 { g.m.helpgc = 0 stopm() } else if g.m != &m0 { // 绑定 P acquirep(g.m.nextp.ptr()) g.m.nextp = 0 } // 进入任务调度循环(不再返回) schedule() }
准备进入工作状态的M必须绑定一个有效P,nextp临时持有待绑定的P对象。因为在未正式执行前,并不适合直接设置相关属性。P为M提供cache,以便为工作线程提供对象内存分配。
proc1.go
func acquirep(p *p) { acquirep1(p) // 绑定 mcache g := getg() g.m.mcache = p.mcache } func acquirep1(p *p) { g := getg() g.m.p.set(p) p.m.set(g.m) p.status = _Prunning }
一切就绪后,M进入核心调度循环,这是一个由schedule、execute、goroutine fn、goexit函数构成的逻辑循环。就算M在休眠唤醒后,也只是从“断点”恢复。
proc1.go
func schedule() { g := getg() top: // 准备进入 GC STW,休眠 if sched.gcwaiting != 0 { gcstopm() goto top } var gp *g // 当从 P.next 提取 G 时,inheritTime = true // 不累加 P.schedtick 计数,使得它延长本地队列处理时间 var inheritTime bool // 进入 GC MarkWorker 工作模式 if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(g.m.p.ptr()) if gp != nil { resetspinning() } } // 每处理 n 个任务后就去全局队列获取 G 任务,以确保公平 if gp == nil { if g.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(g.m.p.ptr(), 1) unlock(&sched.lock) if gp != nil { resetspinning() } } } // 从 P 本地队列获取 G 任务 if gp == nil { gp, inheritTime = runqget(g.m.p.ptr()) if gp != nil && g.m.spinning { throw(“schedule: spinning with local work”) } } // 从其他可能的地方获取 G 任务 // 如果获取失败,会让 M 进入休眠状态,被唤醒后重试 if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available resetspinning() } // 执行 goroutine 任务函数 execute(gp, inheritTime) }
有关lockedg的细节,参见后文。
调度函数获取可用的G后,交由execute去执行。同时,它还检查环境开关来决定是否参与垃圾回收。
把相关细节放下,先走完整个调度循环再说。
proc1.go
func execute(gp *g, inheritTime bool) { g := getg() casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard g.m.curg = gp gp.m = g.m gogo(&gp.sched) }
真正关键的就是汇编实现的gogo函数。它从g0栈切换到G栈,然后用一个JMP指令进入G任务函数代码。
asm_amd64.s
TEXT runtime•gogo(SB), NOSPLIT, 0, gobuf_sp(BX) // clear to help garbage collector
MOVQ 0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX // 获取 G 任务函数地址
JMP BX // 执行
这里有个细节,JMP并不是CALL,也就是说不会将PC/IP入栈,那么执行完任务函数后,RET指令恢复的PC/IP值是什么?我们在schedule、execute里也没看到goexit调用,究竟如何再次进入调度循环呢?
在newproc1创建G任务时,我们曾忽略了一个细节。
proc1.go
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g { newg.sched.sp = sp // 此处保存的是 goexit 地址 newg.sched.pc = funcPC(goexit) + _PCQuantum newg.sched.g = guintptr(unsafe.Pointer(newg)) // 此处的调用是关键所在 gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.startpc = fn.fn }
在初始化G.sched时,pc保存的是goexit而非fn。关键秘密就是随后调用的gostartcallfn函数。
stack1.go
func gostartcallfn(gobuf *gobuf, fv *funcval) { gostartcall(gobuf, fn, (unsafe.Pointer)(fv)) }
sys_x86.go
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) { // 调整 sp sp := buf.sp if regSize > ptrSize { sp -= ptrSize *(*uintptr)(unsafe.Pointer(sp)) = 0 } sp -= ptrSize // 将 buf.pc 也就是 goexit 入栈 *(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 然后再次设置 sp 和 pc,此时 pc 才是 G 任务函数 buf.sp = sp buf.pc = uintptr(fn) buf.ctxt = ctxt }
ARM使用LR寄存器存储PC值,而非保存在栈上。
很显然,在初始化完成后,G栈顶端被压入了goexit地址。汇编函数gogo JMP跳转执行G任务,那么函数尾部的RET指令必然是将goexit地址恢复到PC/IP,从而实现任务结束清理操作和再次进入调度循环。
asm_amd64.s
TEXT runtime•goexit(SB),NOSPLIT,$0-0 CALL runtime•goexit1(SB) // does not return
proc1.go
func goexit1() { // 切换到 g0 执行 goexit0 mcall(goexit0) } // goexit continuation on g0 func goexit0(gp *g) { g := getg() // 清理 G 状态 casgstatus(gp, _Grunning, _Gdead) gp.m = nil gp.lockedm = nil g.m.lockedg = nil gp.paniconfault = false gp._defer = nil gp._panic = nil gp.writebuf = nil gp.waitreason = "" gp.param = nil dropg() g.m.locked = 0 // 将 G 放回复用链表 gfput(g.m.p.ptr(), gp) // 重新进入调度循环 schedule() }
无论是mcall、systemstack,还是gogo都不会更新g0.sched栈现场。需要切换到g0栈时,直接从“g_sched+gobuf_sp”读取地址恢复SP。所以调用goexit0/schedule时,g0栈又从头开始,原调用堆栈全部失效,就算不返回也无所谓。
在mstart1里调用gosave初始化了g0.sched.sp等数据,
proc1.go
func mstart1() { // Record top of stack for use by mcall. // Once we call schedule we’re never coming back, // so other calls can reuse this stack space. gosave(&g.m.g0.sched) g.m.g0.sched.pc = ^uintptr(0) // make sure it is never used }
asm_amd64.s
// save state in Gobuf; setjmp TEXT runtime•gosave(SB), NOSPLIT, 0, gobuf_ret(AX) MOVQ $0, gobuf_ctxt(AX) MOVQ BP, gobuf_bp(AX) MOVQ g(CX), BX MOVQ BX, gobuf_g(AX) RET
至此,单次任务完整结束,又回到查找待运行G任务的状态,循环往复。
findrunnable
为了找到可以运行的G任务,findrunnable可谓费尽心机。本地队列、全局队列、网络任务(netpoll),甚至是从其他P任务队列偷窃。所有的目的都是为了尽快完成所有任务,充分发挥多核并行能力。
proc1.go
func findrunnable() (gp g, inheritTime bool) { g := getg() top: // 垃圾回收 if sched.gcwaiting != 0 { gcstopm() goto top } // fing 是用来执行 finalizer 的 goroutine if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0) } } // 从本地队列获取 if gp, inheritTime := runqget(g.m.p.ptr()); gp != nil { return gp, inheritTime } // 从全局队列获取 if sched.runqsize != 0 { gp := globrunqget(g.m.p.ptr(), 0) if gp != nil { return gp, false } } // 检查 netpoll 任务 if netpollinited() && sched.lastpoll != 0 { if gp := netpoll(false); gp != nil { // non-blocking // 返回的是多任务链表,将其他任务放回全局队列 // gp.schedlink 链表结构 injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } // 随机挑一个 P,偷些任务 for i := 0; i < int(4gomaxprocs); i++ { if sched.gcwaiting != 0 { goto top } // 随机数取模确定目标 P p := allp[fastrand1()%uint32(gomaxprocs)] var gp g if p == g.m.p.ptr() { // 本地队列 gp, _ = runqget(p) } else { // 如果尝试次数太多,连目标 P.runnext 都偷,这是饿得狠了 stealRunNextG := i > 2int(gomaxprocs) gp = runqsteal(g.m.p.ptr(), p, stealRunNextG) } if gp != nil { return gp, false } } stop: // 检查 GC MarkWorker if p := g.m.p.ptr(); gcBlackenEnabled != 0 && p.gcBgMarkWorker != nil && gcMarkWorkAvailable(p) { p.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := p.gcBgMarkWorker casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } // 再次检查垃圾回收状态 if sched.gcwaiting != 0 || g.m.p.ptr().runSafePointFn != 0 { goto top } // 再次尝试全局队列 if sched.runqsize != 0 { gp := globrunqget(g.m.p.ptr(), 0) return gp, false } // 释放当前 P,取消自旋状态 p := releasep() pidleput(p) if g.m.spinning { g.m.spinning = false xadd(&sched.nmspinning, -1) } // 再次检查所有 P 任务队列 for i := 0; i < int(gomaxprocs); i++ { p := allp[i] if p != nil && !runqempty(p) { // 绑定一个空闲 P,回到头部尝试偷取任务 p = pidleget() if p != nil { acquirep(p) goto top } break } } // 再次检查 netpoll if netpollinited() && xchg64(&sched.lastpoll, 0) != 0 { gp := netpoll(true) // block until new work is available atomicstore64(&sched.lastpoll, uint64(nanotime())) if gp != nil { p = pidleget() if p != nil { acquirep(p) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } injectglist(gp) } } // 一无所得,休眠 stopm() goto top }
每次看到这里,我都想吐槽一句:这段代码就不能改改?
按照查找流程,我们依次查看不同优先级的获取方式。首先是本地队列,其中P.runnext优先级最高。
proc1.go
func runqget(p *p) (gp *g, inheritTime bool) { // 优先从 runnext 获取 // 循环尝试 cas。为什么用同步操作?因为可能有其他 P 从本地队列偷任务 for { next := p.runnext if next == 0 { break } if p.runnext.cas(next, 0) { return next.ptr(), true } } // 本地队列 for { h := atomicload(&p.runqhead) t := p.runqtail if t == h { return nil, false } // 从头部提取 gp := p.runq[h%uint32(len(p.runq))] if cas(&p.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }
runnext不会影响schedtick计数,也就是说让schedule执行更多的任务才会去检查全局队列,所以才会有inheritTime=true的说法。
在检查全局队列时,除返回一个可用G外,还会批量转移一批到P本地队列,毕竟不能每次加锁去操作全局队列。
proc1.go
func globrunqget(p *p, max int32) *g { if sched.runqsize == 0 { return nil } // 将全局队列任务等分,计算最多能批量获取的任务数量 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } // 不能超过 runq 数组长度的一半(128) if n > int32(len(p.runq))/2 { n = int32(len(p.runq)) / 2 } // 调整计数 sched.runqsize -= n if sched.runqsize == 0 { sched.runqtail = 0 } // 返回第一个 G 任务,随后的才是要批量转移到本地的任务 gp := sched.runqhead.ptr() sched.runqhead = gp.schedlink n— for ; n > 0; n— { gp1 := sched.runqhead.ptr() sched.runqhead = gp1.schedlink runqput(p, gp1, false) } return gp }
只有当本地和全局队列都为空时,才会考虑去检查其他P任务队列。这个优先级最低,因为会影响目标P的执行(必须使用原子操作)。
proc1.go
func runqsteal(p, p2 *p, stealRunNextG bool) *g {
t := p.runqtail
// 尝试从 p2 偷取一半任务存入 p 本地队列
n := runqgrab(p2, &p.runq, t, stealRunNextG)
if n == 0 {
return nil
}
// 返回尾部的 G 任务
n—
gp := p.runq[(t+n)%uint32(len(p.runq))]
if n == 0 {
return gp
}
// 调整目标队列尾部状态
atomicstore(&p.runqtail, t+n)
return gp
}
func runqgrab(p *p, batch *[256]*g, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 计算批量转移任务数量
h := atomicload(&p.runqhead)
t := atomicload(&p.runqtail)
n := t - h
n = n - n/2
// 如果没有,那就尝试偷 runnext 吧
if n == 0 {
if stealRunNextG {
if next := p.runnext; next != 0 {
usleep(100)
if !p.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next.ptr()
return 1
}
}
return 0
}
// 数据异常,不可能超过一半值重试
if n > uint32(len(p.runq)/2) { // read inconsistent h and t
continue
}
// 转移任务
for i := uint32(0); i < n; i++ {
g := p.runq[(h+i)%uint32(len(p.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 修改源 P 队列状态
// 失败重试。因为没有修改源和目标队列位置状态,所以没有影响
if cas(&p.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
这就是某份官方文档里提及的Work-Stealing算法。
lockedg
在执行cgo调用时,会用lockOSThread将G锁定在当前线程。
cgocall.go
func cgocall(fn, arg unsafe.Pointer) int32 { /* * Lock g to m to ensure we stay on the same stack if we do a * cgo callback. Add entry to defer stack in case of panic. */ lockOSThread() mp := getg().m mp.ncgocall++ mp.ncgo++ defer endcgo(mp) } func endcgo(mp *m) { mp.ncgo— unlockOSThread() // invalidates mp }
锁定操作很简单,只须设置G.lockedm和M.lockedg即可。
proc.go
func lockOSThread() { getg().m.locked += _LockInternal dolockOSThread() } func dolockOSThread() { g := getg() g.m.lockedg = g g.lockedm = g.m }
当调度函数schedule检查到locked属性时,会适时移交,让正确的M去完成任务。
简单点说,就是lockedm会休眠,直到某人将lockedg交给它。而不幸拿到lockedg的M,则要将lockedg连同P一起传递给lockedm,还负责将其唤醒。至于它自己,则因失去P而被迫休眠,直到wakep带着新的P唤醒它。
proc1.go
func schedule() { g := getg() // 如果当前 M 是 lockedm,那么休眠 // 没有立即 execute(lockedg),是因为该 lockedg 此时可能被其他 M 获取 // 兴许是中途用 gosched 暂时让出 P,进入待运行队列 if g.m.lockedg != nil { stoplockedm() execute(g.m.lockedg, false) // Never returns. } top: … // 如果获取到的 G 是 lockedg,那么将其连同 P 交给 lockedm 去执行 // 休眠,等待唤醒后重新获取可用 G if gp.lockedm != nil { startlockedm(gp) goto top } // 执行 goroutine 任务函数 execute(gp, inheritTime) } func startlockedm(gp *g) { g := getg() mp := gp.lockedm // 移交 P,并唤醒 lockedm p := releasep() mp.nextp.set(p) notewakeup(&mp.park) // 当前 M 休眠 stopm() }
从中可以看出,除lockedg只能由lockedm执行外,lockedm在完成任务或主动解除锁定前也不会执行其他任务。这也是在前面章节我们用cgo生成大量M实例的原因。
proc1.go
func goexit0(gp *g) { g := getg() // 解除锁定设置 gp.m = nil gp.lockedm = nil g.m.lockedg = nil }
可调用UnlockOSThread主动解除锁定,以便允许其他M完成当前任务。
proc1.go
func unlockOSThread() { g := getg() if g.m.locked < _LockInternal { systemstack(badunlockosthread) } g.m.locked -= _LockInternal dounlockOSThread() } func dounlockOSThread() { g := getg() if g.m.locked != 0 { return } g.m.lockedg = nil g.lockedm = nil }